Ibisを使ってみる | Hadoop Advent Calendar 2016 #21
こんにちは、小澤です。 この記事はHadoop Advent Calendar 21日目のものとなります。
前回はDeepleaning4J on Sparkを利用して、SparkでDeep Learningを行いました。
今回はIbisというものを使ってみたいと思います。
Ibisとは
ImpalaなどのSQL on Hadoopに対してPythonからDataFrame風に処理を実行できるライブラリのようです。 PythonのDataFrameのライブラリであるPandasと同じ作者が作っているようなので、そうあたりの親和性もいいのではないかと思い今回は使ってみたいと思います。
なぜこのようなライブラリが必要か
以下の図のような環境を考えてみます。
ここではDWHなどを挟まずにHadoop上にあるデータにSQLでアクセスしています。 また、データ抽出と分析の過程でそれぞれ別なものを利用しています。 この時、分析に使う環境では主にDataFrameの形式でデータを扱うことになります。DataFrameではSQLと似たような操作も可能なので、気になるのが「どこまでをSQLで記述して、どこからをDataFrameとして扱うか」ですね。 そこで、DataFrameの扱いに慣れている人のために、最初のSQLの段階からDataFrameに対する操作であるかのように扱うためのライブラリとしてあるのがIbisです。
使ってみる
分散処理環境を構築してImpalaなどに接続してみるのが一番いいかとは思うのですが、ここでは試しに動かしてみるのに最も手軽なSQLiteをデータのある環境として利用します。 内容としてはQuickstart on Crunchbase analysis using Ibis and SQLiteにあるものを動かしてみます。
インストールと接続
まずはインストールします
pip install ibis-framework
データベースへのアクセス
import ibis ibis.options.interactive = True con = ibis.sqlite.connect('../resources/crunchbase.db/crunchbase.db') con.list_tables() ['acquisitions', 'companies', 'investments', 'rounds']
テーブルの一覧が確認できました。
簡単な処理の実装
次にテーブルに関する情報と集約処理を行ってみます
# テーブル名をしていして、特定のテーブルを取得して情報を見る rounds = con.table('rounds') rounds.info() Table rows: 87161 Column Type Non-null # ------ ---- ---------- company_permalink string 87161 company_name string 87161 company_category_list string 83464 company_market string 80523 company_country_code string 81291 company_state_code string 59296 company_region string 81016 company_city string 80208 funding_round_permalink string 87161 funding_round_type string 87161 funding_round_code string 23648 funded_at string 87161 funded_month string 87147 funded_quarter string 87147 funded_year int32 87147 raised_amount_usd float 73406 # カラムを選択してcountを行う # SQLだと、count関数とgroup byのようなもの rounds.funding_round_type.value_counts() funding_round_type count 0 angel 4602 1 convertible_note 838 2 debt_financing 5869 3 equity_crowdfunding 2401 4 grant 1523 5 non_equity_assistance 69 6 post_ipo_debt 92 7 post_ipo_equity 425 8 private_equity 1864 9 product_crowdfunding 241 10 secondary_market 61 11 seed 22053 12 undisclosed 4128 13 venture 42995 # 別なテーブルに対して、nullものと、そうでないものそれぞれのカウントを行う expr = acquisitions.price_amount.isnull().name('has_price').value_counts() # executeでPandasのDataFrameとして取得 expr.execute() has_price count 0 0 3816 1 1 51424
データの型
Ibis内部で扱われているデータの型を見ています。
# 内部がテーブルになっている場合はTableExpr type(expr) ibis.expr.types.TableExpr # 1カラムだけの場合はArray companies = con.table('companies') type(companies.funding_total_usd) ibis.expr.types.FloatArray # 集約関数の結果など、1つの値になっている場合Scalar expr = companies.funding_total_usd.mean() type(expr) ibis.expr.types.DoubleScalar
executeを行うとTableExprはPandasにおけるDataFrame、ArrayはSeries、Scalarはnumpyの変数になるようです。
クロス集計テーブルの作成
次にクロス集計を行う例を見ていきます。 まず、timestampが入っているデータに対する処理が行える例が出ています。
funded_at = rounds.funded_at.cast('timestamp') funded_at.year().value_counts()[10:20] year count 0 1960 3 1 1973 1 2 1974 2 3 1979 1 4 1982 3 5 1983 1 6 1984 4 7 1985 6 8 1986 4 9 1987 6
ここでは明示的にキャストして年だけを抽出しています。 この仕組みを利用して以下のカラムを利用してたクロス集計を行います。
rounds.funding_round_code.value_counts() funding_round_code count 0 None 63513 1 A 11382 2 B 6548 3 C 3329 4 D 1530 5 E 608 6 F 201 7 G 45 8 H 5
まずは縦持ちの方で必要なデータを抽出します。
year = funded_at.year().name('year') expr = rounds[(rounds.funding_round_type == 'venture') & year.between(2000, 2015) & rounds.funding_round_code.notnull()] \ .group_by([year, 'funding_round_code']) \ .size() result = expr.execute() result[:10] year funding_round_code count 0 2000 A 79 1 2000 B 32 2 2000 C 10 3 2000 D 8 4 2000 E 2 5 2001 A 50 6 2001 B 30 7 2001 C 17 8 2001 D 4 9 2001 E 1
ここでちょっとしたDataFrame操作のテクニックが出てきます。 DataFrameに対する操作では、行数と同じサイズの真偽値の配列を渡してやることで配列中のTrueなっている行のみを抜き出すことができます。 その仕組みを利用して、expr= ... ではroundsにに対して3つの真偽値配列をandで渡すことで条件に一致する行のみを抜き出しています。
これを横持ちに変換することでクロス集計表を得ます。
pivoted = result.set_index(['year', 'funding_round_code']) \ .unstack('funding_round_code') \ .fillna(0) pivoted count funding_round_code A B C D E F G H year 2000 79.0 32.0 10.0 8.0 2.0 0.0 0.0 0.0 2001 50.0 30.0 17.0 4.0 1.0 0.0 0.0 0.0 2002 35.0 39.0 25.0 5.0 2.0 2.0 0.0 0.0 2003 68.0 45.0 14.0 13.0 1.0 0.0 1.0 0.0 2004 146.0 76.0 32.0 15.0 3.0 1.0 0.0 0.0 2005 513.0 334.0 176.0 67.0 26.0 6.0 0.0 0.0 2006 717.0 465.0 226.0 91.0 35.0 7.0 1.0 0.0 2007 956.0 583.0 281.0 110.0 49.0 7.0 1.0 0.0 2008 979.0 653.0 308.0 120.0 54.0 17.0 1.0 0.0 2009 753.0 531.0 290.0 147.0 55.0 28.0 0.0 0.0 2010 1013.0 598.0 369.0 149.0 52.0 18.0 2.0 0.0 2011 1250.0 700.0 334.0 175.0 60.0 18.0 5.0 0.0 2012 1242.0 610.0 345.0 184.0 69.0 16.0 7.0 0.0 2013 1606.0 796.0 377.0 185.0 81.0 38.0 6.0 0.0 2014 1757.0 952.0 471.0 223.0 108.0 36.0 18.0 5.0 2015 88.0 71.0 34.0 28.0 8.0 5.0 3.0 0.0
set_indexでyear→funding_round_codeの階層を持つインデックスを作成て、unstackでそれを縦横に変換しています。
Bucketを使う
最後のBucketを使ってみます。
funding_buckets = [0, 1000000, 10000000, 50000000, 100000000, 500000000, 1000000000] bucket = companies.funding_total_usd.bucket(funding_buckets, include_over=True) bucket.value_counts() unnamed count 0 NaN 12055 1 0.0 15965 2 1.0 15754 3 2.0 7839 4 3.0 1532 5 4.0 1022 6 5.0 88 7 6.0 37
bucketで指定した配列の値に応じて、companies.funding_total_usdが0から1000000の間であれば0、1000000から10000000の間であれば1のように数値が振られていきます。
次に各bucketごとの名前を設定していきます
bucket_names = ['0 to 1m', '1m to 10m', '10m to 50m', '50m to 100m', '100m to 500m', '500m to 1b', 'Over 1b'] counts = bucket.name('bucket').value_counts() labeled = counts.bucket.label(bucket_names) with_names = counts.mutate(bucket_names=labeled) with_names bucket count bucket_names 0 NaN 12055 None 1 0.0 15965 0 to 1m 2 1.0 15754 1m to 10m 3 2.0 7839 10m to 50m 4 3.0 1532 50m to 100m 5 4.0 1022 100m to 500m 6 5.0 88 500m to 1b 7 6.0 37 Over 1b
mutateを使って項目の追加をしている以外は新しい要素はないかと思います。
次に集計を行います。aggregateを利用して複数の集計を行っています。
metrics = companies.group_by(bucket.name('bucket')) \ .aggregate(count=companies.count(), total_funding=companies.funding_total_usd.sum()) \ .mutate(bucket_names=lambda x: x.bucket.label(bucket_names)) metrics bucket count total_funding bucket_names 0 NaN 12055 NaN None 1 0.0 15965 4.505177e+09 0 to 1m 2 1.0 15754 5.712283e+10 1m to 10m 3 2.0 7839 1.724166e+11 10m to 50m 4 3.0 1532 1.054132e+11 50m to 100m 5 4.0 1022 1.826600e+11 100m to 500m 6 5.0 88 5.804196e+10 500m to 1b 7 6.0 37 1.040123e+11 Over 1b
最後にテーブル化します。少々内容が複雑ですが、個々の要素はこれまで登場したものの組み合わせなので難しい点はないかと思います
joined = companies.mutate(bucket=bucket, status=companies.status.fillna('Unknown')) \ [(companies.founded_at > '2010-01-01') | companies.funding_total_usd.isnull()] \ .group_by(['bucket', 'status']) \ .size() \ .mutate(bucket_name=lambda x: x.bucket.label(bucket_names).fillna('Unknown')) table = joined.execute() table.set_index(['status', 'bucket', 'bucket_name']).unstack('status') count status Unknown acquired closed operating bucket bucket_name NaN Unknown 3284.0 511.0 458.0 7802.0 0.0 0 to 1m 209.0 145.0 477.0 8246.0 1.0 1m to 10m 109.0 254.0 119.0 5125.0 2.0 10m to 50m 20.0 65.0 13.0 1346.0 3.0 50m to 100m 1.0 10.0 1.0 152.0 4.0 100m to 500m 1.0 2.0 1.0 122.0 5.0 500m to 1b NaN NaN NaN 12.0 6.0 Over 1b NaN NaN NaN 5.0
生成されるSQL
ここまででIbisを使ったDataFrameによる操作のイメージはつかめたかと思います。
次に実際に裏側ではどのようなSQLが生成されているのかを見てみましょう。
print(ibis.impala.compile(joined))
を実行すると、下記のようなSQL文が表示されます。
SELECT *, isnull(CASE `bucket` WHEN 0 THEN '0 to 1m' WHEN 1 THEN '1m to 10m' WHEN 2 THEN '10m to 50m' WHEN 3 THEN '50m to 100m' WHEN 4 THEN '100m to 500m' WHEN 5 THEN '500m to 1b' WHEN 6 THEN 'Over 1b' ELSE NULL END, 'Unknown') AS `bucket_name` FROM ( SELECT `bucket`, `status`, count(*) AS `count` FROM ( SELECT `permalink`, `name`, `homepage_url`, `category_list`, `market`, `funding_total_usd`, isnull(`status`, 'Unknown') AS `status`, `country_code`, `state_code`, `region`, `city`, `funding_rounds`, `founded_at`, `founded_month`, `founded_quarter`, `founded_year`, `first_funding_at`, `last_funding_at`, CASE WHEN (`funding_total_usd` >= 0) AND (`funding_total_usd` < 1000000) THEN 0 WHEN (`funding_total_usd` >= 1000000) AND (`funding_total_usd` < 10000000) THEN 1 WHEN (`funding_total_usd` >= 10000000) AND (`funding_total_usd` < 50000000) THEN 2 WHEN (`funding_total_usd` >= 50000000) AND (`funding_total_usd` < 100000000) THEN 3 WHEN (`funding_total_usd` >= 100000000) AND (`funding_total_usd` < 500000000) THEN 4 WHEN (`funding_total_usd` >= 500000000) AND (`funding_total_usd` <= 1000000000) THEN 5 WHEN `funding_total_usd` > 1000000000 THEN 6 ELSE NULL END AS `bucket` FROM companies WHERE ((`founded_at` > '2010-01-01') OR `funding_total_usd` IS NULL) ) t1 GROUP BY 1, 2 ) t0
SQLを書くかDataFrameで操作するかについて
Ibisを使ってDataFrameを操作すると内部的にはSQLに変換されています。
これは利用する際はSQLを書かなくてもいいことを意味しています。
しかし、一方でSQLに慣れている人やプログラミングに詳しくないデータ分析者などの場合はSQLでかけたほうがいいと思うことも多かと思います。
これについてはSpark SQLとDataFrame APIなんかでも同じですが、どちらを利用するのが良いかというのは一概には決められないと思います。
必要に応じて場面ごとに使い分けれるようになるのが一番かと思います。
終わりに
今回はIbisというものを使ってみました。
DataFrameの操作に慣れている人にとっては非常に便利なライブラリとなっており、うまく活用していければ非常に強力なツールになってくれることと思います。
明日はHBaseについて書かせていただく予定です。
ぜひ、お楽しみに!